#!/bin/bash
#
# orchestrator-client: a warapper script for calling upon orchestrator's API
#
# This script serves as a command lien client for orchestrator. It talks to orchestrator
# by invoking GET requests on orchestrator's API. It formats and normalizes output, converting from
# JSON format to textual format.
#
# Command line options and output format are intentionally compatible with the CLI variation of
# orchestrator.
#
# With this script, you can conveniently talk to orchestrator without needing to have the
# orchestrator binary, configuration files, database access etc.
#
# Prerequisite:
#   set the ORCHESTRATOR_API variable to point to your orchestrator service.
#   You may specify a single endpoint, like so:
#     export ORCHESTRATOR_API="http://orchestrator.myservice.com:3000/api"
#   Or you may specify multiple endpoints, space delimited, in which case orchestrator will iterate all,
#   and require one of them to satisfy leader-check. This is your way to provide orchestrator-client
#   with all service nodes and let it figure out by itself identify of leader, no need for proxy. Example:
#     export ORCHESTRATOR_API="http://service1:3000/api http://service2:3000/api http://service3:3000/api"
#
# Usage:
#   orchestrator -c <command> [flags...]
# Examples:
#   orchestrator -c all-instances
#   orchestrator -c which-replicas -i some.master.com:3306
#   orchestrator -c which-cluster-instances --alias mycluster
#   orchestrator -c replication-analysis
#   orchestrator -c register-candidate -i candidate.host.com:3306 --promotion-rule=prefer
#   orchestrator -c recover -i failed.host.com:3306

# /etc/profile.d/orchestrator-client.sh is for you to set any environment.
# In particular, you will want to set ORCHESTRATOR_API
[ -f /etc/profile.d/orchestrator-client.sh ] && . /etc/profile.d/orchestrator-client.sh

orchestrator_api="${ORCHESTRATOR_API:-http://localhost:3000}"
leader_api=

command=
instance=
destination=
alias=
owner="$(whoami | xargs)"
reason=
duration="10m"
promotion_rule=
pool=
hostname_flag=
api_path=

instance_hostport=
destination_hostport=
default_port=3306

api_response=
api_details=

for arg in "$@"; do
  shift
  case "$arg" in
    "-help"|"--help")                     set -- "$@" "-h" ;;
    "-command"|"--command")               set -- "$@" "-c" ;;
    "-alias"|"--alias")                   set -- "$@" "-a" ;;
    "-owner"|"--owner")                   set -- "$@" "-o" ;;
    "-reason"|"--reason")                 set -- "$@" "-r" ;;
    "-promotion-rule"|"--promotion-rule") set -- "$@" "-R" ;;
    "-duration"|"--duration")             set -- "$@" "-u" ;;
    "-pool"|"--pool")                     set -- "$@" "-l" ;;
    "-hostname"|"--hostname")             set -- "$@" "-H" ;;
    "-api"|"--api")                       set -- "$@" "-U" ;;
    "-path"|"--path")                     set -- "$@" "-P" ;;
    *)                                    set -- "$@" "$arg"
  esac
done

while getopts "c:i:d:s:a:D:U:o:r:u:R:l:H:P:h" OPTION
do
  case $OPTION in
    h) command="help" ;;
    c) command=$OPTARG ;;
    i) instance=$OPTARG ;;
    d) destination=$OPTARG ;;
    s) destination=$OPTARG ;;
    a) alias=$OPTARG ;;
    o) owner=$OPTARG ;;
    r) reason=$OPTARG ;;
    u) duration=$OPTARG ;;
    R) promotion_rule=$OPTARG ;;
    l) pool=$OPTARG ;;
    H) hostname_flag=$OPTARG ;;
    D) default_port=$OPTARG ;;
    U) [ ! -z "$OPTARG" ] && orchestrator_api=$OPTARG ;;
    P) api_path=$OPTARG ;;
  esac
done

function fail() {
  message="$1"
  >&2 echo "$message"
  exit 1
}

function check_requirements() {
  which curl > /dev/null 2>&1 || fail "cannot find curl"
  which jq   > /dev/null 2>&1 || fail "cannot find jq"
}

function assert_nonempty() {
  name="$1"
  value="$2"

  if [ -z "$value" ] ; then
    fail "$name must be provided"
  fi
}

# to_hostport transforms:
# - fqdn:port => fqdn/port
# - fqdn => fqdn/default_port
function to_hostport {
  instance_key="$1"

  if [ -z "$instance_key" ] ; then
    echo ""
    return
  fi

  if [[ $instance_key == *":"* ]]; then
    echo $instance_key | tr ':' '/'
  else
    echo "$instance_key/$default_port"
  fi
}

function normalize_orchestrator_api() {
  api="${1:-$orchestrator_api}"
  api=${api%/}
  if [[ ! $api == *"/api" ]]; then
    api=${api%/}
    api="$api/api"
  fi
  echo $api
}


function detect_leader_api() {
  # $orchestrator_api may be a single URI (e.g. "http://orchestrator.service/api")
  # - in which case we just normalize the URL
  # or it may be a space delimited list, such as "http://host1:3000/api http://host2:3000/api http://host3:3000/api "
  # - in which case we figure out which of the URLs is the leader
  leader_api=
  apis=($orchestrator_api)
  if [ ${#apis[@]} -eq 1 ] ; then
    leader_api="$(normalize_orchestrator_api $orchestrator_api)"
    return
  fi
  for api in ${apis[@]} ; do
    api=$(normalize_orchestrator_api $api)
    leader_check=$(curl -m 0.5 -s -o /dev/null -w "%{http_code}" "${api}/leader-check")
    if [ "$leader_check" == "200" ] ; then
      leader_api="$api"
      return
    fi
  done
  fail "Cannot determine leader from $orchestrator_api"
}

function urlencode() {
  uri="$1"
  echo "$uri" | jq -s -R -r @uri | tr -d '\n'
}

function api() {
  path="$1"

  uri="$leader_api/$path"
  # echo $uri
  set -o pipefail
  api_response=$(curl -s "$uri" | jq '.')
  if [ $? -ne 0 ] ; then
    fail "Cannot access orchestrator at ${leader_api}"
  fi
  if [ "$(echo $api_response | jq -r 'type')" == "array" ] ; then
    return
  fi
  if [ "$(echo $api_response | jq -r 'type')" == "string" ] ; then
    return
  fi
  if [ "$(echo $api_response | jq -r 'has("Code")')" == "false" ] ; then
    return
  fi
  api_details=$(echo $api_response | jq '.Details')
  if echo $api_response | jq -r '.Code' | grep -q "ERROR" ; then
    echo $api_response | jq -r '.Message' | tr -d "'" | xargs >&2 echo
    [ "$api_details" != "null" ] && echo $api_details
    exit 1
  fi
}

function print_response {
  echo $api_response
}

function print_details {
  echo $api_details
}

function filter_key {
  cat - | jq '.Key'
}

function filter_master_key {
  cat - | jq '.MasterKey'
}

function filter_keys {
  cat - | jq '.[] | .Key'
}

function print_key {
  cat - | jq -r '. | (.Hostname + ":" + (.Port | tostring))'
}

function which_api() {
  echo "$leader_api"
}

function api_call() {
  assert_nonempty "path" "$api_path"
  api "$api_path"
  print_response
}

function prompt_help() {
  echo "Usage: orchestrator-client -c <command> [flags...]"
  echo "Example: orchestrator-client -c which-master -i some.replica"
  echo "Available commands:"
  cat "$0" | sed -n '/run_command/,/esac/p' | egrep '".*"[)].*;;' | sed -r -e 's/"(.*?)".*#(.*)/\1~\2/' | column -t -s "~"
}

function discover() {
  assert_nonempty "instance" "$instance_hostport"
  api "discover/$instance_hostport"
  print_details | filter_key | print_key
}

function ascii_topology() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "topology/${alias:-$instance}"
  print_details | jq -r '.'
}

function instance() {
  assert_nonempty "instance" "$instance_hostport"
  api "instance/$instance_hostport"
  print_response | filter_key | print_key
}

function which_master() {
  assert_nonempty "instance" "$instance_hostport"
  api "instance/$instance_hostport"
  print_response | filter_master_key | print_key
}

function which_replicas() {
  assert_nonempty "instance" "$instance_hostport"
  api "instance-replicas/$instance_hostport"
  print_response | filter_keys | print_key
}

function which_cluster() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "cluster-info/${alias:-$instance}"
  print_response | jq -r '.ClusterName'
}

function which_cluster_master() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "master/${alias:-$instance}"
  print_response | jq -r '.ClusterName'
}

function which_cluster_instances() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "cluster/${alias:-$instance}"
  print_response | filter_keys | print_key
}

function all_clusters_masters() {
  api "masters"
  print_response | filter_keys | print_key
}

function clusters() {
  api "clusters-info"
  print_response | jq -r '.[].ClusterName'
}

function clusters_alias() {
  api "clusters-info"
  print_response | jq -r '.[] | (.ClusterName + "," + .ClusterAlias)'
}

function forget() {
  assert_nonempty "instance" "$instance_hostport"
  api "forget/$instance_hostport"
}

function all_instances() {
  api "all-instances"
  print_response | filter_keys | print_key
}

function which_cluster_osc_replicas() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "cluster-osc-replicas/${alias:-$instance}"
  print_response | filter_keys | print_key
}

function downtimed() {
  api "downtimed/${alias:-$instance}"
  print_response | filter_keys | print_key
}

function dominant_dc() {
  api "masters"
  print_response | jq -r '.[].DataCenter' | sort | uniq -c | sort -nr | head -n 1 | awk '{print $2}'
}

function submit_pool_instances() {
  # 'instance' is comma delimited, e.g.
  #   myinstance1.com:3306,myinstance2.com:3306,myinstance3.com:3306
  assert_nonempty "instance" "$instance"
  assert_nonempty "pool" "$pool"
  api "submit-pool-instances/$pool?instances=$(urlencode "$instance")"
  print_details | jq -r .
}

function which_heuristic_cluster_pool_instances() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  # pool is optional
  api "heuristic-cluster-pool-instances/${alias:-$instance}/${pool}"
  print_details | filter_keys | print_key
}

function begin_downtime() {
  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "owner" "$owner"
  assert_nonempty "reason" "$reason"
  assert_nonempty "duration" "$duration"
  api "begin-downtime/$instance_hostport/$(urlencode "$owner")/$(urlencode "$reason")/$duration"
  print_details | print_key
}

function end_downtime() {
  assert_nonempty "instance" "$instance_hostport"
  api "end-downtime/$instance_hostport"
  print_details | print_key
}

function begin_maintenance() {
  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "owner" "$owner"
  assert_nonempty "reason" "$reason"
  api "begin-maintenance/$instance_hostport/$(urlencode "$owner")/$(urlencode "$reason")"
  print_details | print_key
}

function end_maintenance() {
  assert_nonempty "instance" "$instance_hostport"
  api "end-maintenance/$instance_hostport"
  print_details | print_key
}

function register_candidate() {
  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "promotion-rule" "$promotion_rule"
  api "register-candidate/$instance_hostport/$promotion_rule"
  print_details | print_key
}

function register_hostname_unresolve() {
  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "hostname" "$hostname_flag"
  api "register-hostname-unresolve/$instance_hostport/$hostname_flag"
  print_details | print_key
}

function deregister_hostname_unresolve() {
  assert_nonempty "instance" "$instance_hostport"
  api "deregister-hostname-unresolve/$instance_hostport"
  print_details | print_key
}

function general_singular_relocate_command() {
  path="${1:-$command}"

  assert_nonempty "instance" "$instance_hostport"
  api "${path}/$instance_hostport"
  echo "$(print_details | filter_key | print_key)<$(print_details | filter_master_key | print_key)"
}

function general_relocate_command() {
  path="${1:-$command}"

  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "destination" "$destination_hostport"
  api "${path}/$instance_hostport/$destination_hostport"
  echo "$(print_details | filter_key | print_key)<$(print_details | filter_master_key | print_key)"
}

function general_singular_relocate_replicas_command() {
  path="${1:-$command}"

  assert_nonempty "instance" "$instance_hostport"
  api "${path}/$instance_hostport/$destination_hostport"
  print_details | filter_keys | print_key
}

function general_relocate_replicas_command() {
  path="${1:-$command}"

  assert_nonempty "instance" $instance_hostport
  assert_nonempty "destination" $destination_hostport
  api "${path}/$instance_hostport/$destination_hostport"
  print_details | filter_keys | print_key
}

function relocate() {
  assert_nonempty "instance" "$instance_hostport"
  assert_nonempty "destination" "$destination_hostport"
  api "relocate/$instance_hostport/$destination_hostport"
  echo "$(print_details | filter_key | print_key)<$(print_details | filter_master_key | print_key)"
}

function relocate_replicas() {
  assert_nonempty "instance" $instance_hostport
  assert_nonempty "destination" $destination_hostport
  api "relocate-replicas/$instance_hostport/$destination_hostport"
  print_details | filter_keys | print_key
}

function general_instance_command() {
  path="${1:-$command}"

  assert_nonempty "instance" "$instance_hostport"
  api "$path/$instance_hostport"
  print_details | filter_key | print_key
}

function replication_analysis() {
  api "replication-analysis"
  print_details | jq -r '.[] | (.AnalyzedInstanceKey.Hostname + ":" + (.AnalyzedInstanceKey.Port | tostring) + " (cluster " + .ClusterDetails.ClusterName + "): ") + .Analysis'
}

function recover() {
  assert_nonempty "instance" "$instance_hostport"
  api "recover/$instance_hostport"
  print_details | print_key
}

function graceful_master_takeover() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "graceful-master-takeover/${alias:-$instance}"
  print_details | jq '.SuccessorKey' | print_key
}

function force_master_failover() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  api "force-master-failover/${alias:-$instance}"
  print_details | jq '.SuccessorKey' | print_key
}

function ack_cluster_recoveries() {
  assert_nonempty "instance|alias" "${alias:-$instance}"
  assert_nonempty "reason" "$reason"
  api "ack-recovery/cluster/${alias:-$instance}?comment=$(urlencode $reason)"
  print_details
}

function disable_global_recoveries() {
  api "disable-global-recoveries"
  print_details | jq -r .
}

function enable_global_recoveries() {
  api "enable-global-recoveries"
  print_details | jq -r .
}

function check_global_recoveries() {
  api "check-global-recoveries"
  print_details | jq -r .
}

function raft_leader() {
  api "raft-state"
  if print_response | jq -r . | grep -q Leader ; then
    # confirmed raft is running well
    api "raft-leader"
    print_response | jq -r '.'
  else
    fail "Cannot determine raft state"
  fi
}

function raft_leader_hostname() {
  api "raft-state"
  if print_response | jq -r . | grep -q Leader ; then
    # confirmed raft is running well
    api "status"
    print_details | jq -r '.Hostname'
  else
    fail "Cannot determine raft state"
  fi
}

# raft_elect_leader elects the raft leader by using --hostname as hint
function raft_elect_leader() {
  assert_nonempty "hostname" "$hostname_flag"
  api "raft-state"
  if print_response | jq -r . | grep -q Leader ; then
    # confirmed raft is running well
    api "raft-yield-hint/${hostname_flag}"
    print_details | jq -r .
  else
    fail "Cannot determine raft state"
  fi
}

function run_command() {
  if [ -z "$command" ] ; then
    fail "No command given. Use -c or --command"
  fi
  command=$(echo $command | sed -e 's/slave/replica/')
  case $command in
    "help") prompt_help ;; # Show available commands

    "which-api") which_api ;; # Output the HTTP API to be used
    "api") api_call ;;        # Invoke any API request; provide --path argument

    "discover") discover ;;   # Lookup an instance, investigate it
    "forget") forget ;;       # Forget about an instance's existence

    "topology") ascii_topology ;;                               # Show an ascii-graph of a replication topology, given a member of that topology
    "clusters") clusters ;;                                     # List all clusters known to orchestrator
    "clusters-alias") clusters_alias ;;                         # List all clusters known to orchestrator
    "instance"|"which-instance") instance ;;                    # Output the fully-qualified hostname:port representation of the given instance, or error if unknown
    "which-master") which_master ;;                             # Output the fully-qualified hostname:port representation of a given instance's master
    "which-replicas") which_replicas ;;                         # Output the fully-qualified hostname:port list of replicas of a given instance
    "which-cluster-instances") which_cluster_instances ;;       # Output the list of instances participating in same cluster as given instance
    "which-cluster") which_cluster ;;                           # Output the name of the cluster an instance belongs to, or error if unknown to orchestrator
    "which-cluster-master") which_cluster_master ;;             # Output the name of a writable master in given cluster
    "all-clusters-masters") all_clusters_masters ;;             # List of writeable masters, one per cluster
    "all-instances") all_instances ;;                           # The complete list of known instances
    "which-cluster-osc-replicas") which_cluster_osc_replicas ;; # Output a list of replicas in a cluster, that could serve as a pt-online-schema-change operation control replicas
    "downtimed") downtimed ;;                                   # List all downtimed instances
    "dominant-dc") dominant_dc ;;                               # Name the data center where most masters are found

    "relocate") general_relocate_command ;;                   # Relocate a replica beneath another instance
    "relocate-replicas") general_relocate_replicas_command ;; # Relocates all or part of the replicas of a given instance under another instance

    "match") general_relocate_command ;;                               # Matches a replica beneath another (destination) instance using Pseudo-GTID
    "match-up") general_singular_relocate_command ;;                   # Transport the replica one level up the hierarchy, making it child of its grandparent, using Pseudo-GTID
    "match-up-replicas") general_singular_relocate_replicas_command ;; # Matches replicas of the given instance one level up the topology, making them siblings of given instance, using Pseudo-GTID

    "move-up") general_singular_relocate_command ;;                    # Move a replica one level up the topology
    "move-below") general_relocate_command ;;                          # Moves a replica beneath its sibling. Both replicas must be actively replicating from same master.
    "move-equivalent") general_relocate_command ;;                     # Moves a replica beneath another server, based on previously recorded "equivalence coordinates"
    "move-up-replicas") general_singular_relocate_replicas_command ;;  # Moves replicas of the given instance one level up the topology
    "make-co-master") general_singular_relocate_command ;;             # Create a master-master replication. Given instance is a replica which replicates directly from a master.
    "take-master") general_singular_relocate_command ;;                # Turn an instance into a master of its own master; essentially switch the two.
    "take-siblings") general_singular_relocate_command ;;              # Turn all siblings of a replica into its sub-replicas.

    "move-gtid") general_relocate_command ;;                           # Move a replica beneath another instance via GTID
    "move-replicas-gtid") general_relocate_replicas_command ;;         # Moves all replicas of a given instance under another (destination) instance using GTID

    "repoint") general_relocate_command ;;                             # Make the given instance replicate from another instance without changing the binglog coordinates. Use with care
    "repoint-replicas") general_singular_relocate_replicas_command ;;  # Repoint all replicas of given instance to replicate back from the instance. Use with care

    "submit-pool-instances") submit_pool_instances ;;                  # Submit a pool name with a list of instances in that pool
    "which-heuristic-cluster-pool-instances") which_heuristic_cluster_pool_instances ;; # List instances of a given cluster which are in either any pool or in a specific pool

    "begin-downtime") begin_downtime ;;                               # Mark an instance as downtimed
    "end-downtime") end_downtime ;;                                   # Indicate an instance is no longer downtimed
    "begin-maintenance") begin_maintenance ;;                         # Request a maintenance lock on an instance
    "end-maintenance") end_maintenance ;;                             # Remove maintenance lock from an instance
    "register-candidate") register_candidate ;;                       # Indicate the promotion rule for a given instance
    "register-hostname-unresolve") register_hostname_unresolve ;;     # Assigns the given instance a virtual (aka "unresolved") name
    "deregister-hostname-unresolve") deregister_hostname_unresolve ;; # Explicitly deregister/dosassociate a hostname with an "unresolved" name

    "stop-slave") general_instance_command ;;                   # Issue a STOP SLAVE on an instance
    "stop-slave-nice") general_instance_command ;;              # Issue a STOP SLAVE on an instance, make effort to stop such that SQL thread is in sync with IO thread (ie all relay logs consumed)
    "start-slave") general_instance_command ;;                  # Issue a START SLAVE on an instance
    "restart-slave") general_instance_command ;;                # Issue STOP and START SLAVE on an instance
    "reset-slave") general_instance_command ;;                  # Issues a RESET SLAVE command; use with care
    "detach-replica") general_instance_command ;;               # Stops replication and modifies binlog position into an impossible yet reversible value.
    "reattach-replica") general_instance_command ;;             # Undo a detach-replica operation
    "detach-replica-master-host") general_instance_command ;;   # Stops replication and modifies Master_Host into an impossible yet reversible value.
    "reattach-replica-master-host") general_instance_command ;; # Undo a detach-replica-master-host operation
    "skip-query") general_instance_command ;;                   # Skip a single statement on a replica; either when running with GTID or without

    "set-read-only") general_instance_command ;;     # Turn an instance read-only, via SET GLOBAL read_only := 1
    "set-writeable") general_instance_command ;;     # Turn an instance writeable, via SET GLOBAL read_only := 0
    "flush-binary-logs") general_instance_command ;; # Flush binary logs on an instance

    "recover") recover ;;                                     # Do auto-recovery given a dead instance, assuming orchestrator agrees there's a problem. Override blocking.
    "graceful-master-takeover") graceful_master_takeover ;;   # Gracefully discard master and promote another (direct child) instance instead, even if everything is running well
    "force-master-failover") force_master_failover ;;         # Forcibly discard master and initiate a failover, even if orchestrator doesn't see a problem. This command lets orchestrator choose the replacement master
    "ack-cluster-recoveries") ack_cluster_recoveries ;;       # Acknowledge recoveries for a given cluster; this unblocks pending future recoveries
    "disable-global-recoveries") disable_global_recoveries ;; # Disallow orchestrator from performing recoveries globally
    "enable-global-recoveries") enable_global_recoveries ;;   # Allow orchestrator to perform recoveries globally
    "check-global-recoveries") check_global_recoveries ;;     # Show the global recovery configuration

    "replication-analysis") replication_analysis ;;           # Request an analysis of potential crash incidents in all known topologies

    "raft-leader") raft_leader ;;                   # Get identify of raft leader, assuming raft setup
    "raft-leader-hostname") raft_leader_hostname ;; # Get hostname of raft leader, assuming raft setup
    "raft-elect-leader") raft_elect_leader ;;       # Request raft re-elections, provide hint for new leader's identity

    *) fail "Unsupported command $command" ;;
  esac
}

function main() {
  check_requirements
  detect_leader_api

  instance_hostport=$(to_hostport $instance)
  destination_hostport=$(to_hostport $destination)

  run_command
}

main
